/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server;

import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.proto.PrepareDataRequest;
import org.apache.zookeeper.server.paxos2.Configuration;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;

/**
 * This class implements a simple standalone ZooKeeperServer. It sets up the
 * following chain of RequestProcessors to process requests:
 * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
 */
public class ZooKeeperServer implements ServerStats.Provider {

	protected static final Logger LOG;

	static {
		LOG = Logger.getLogger(ZooKeeperServer.class);

		Environment.logEnv("Server environment:", LOG);
	}

	/**
	 * Create an instance of ZooKeeper server
	 */
	static public interface Factory {
		public ZooKeeperServer createServer() throws IOException;

		public NIOServerCnxn.Factory createConnectionFactory()
				throws IOException;
	}

	/**
	 * The server delegates loading of the tree to an instance of the interface
	 */
	public interface DataTreeBuilder {
		public DataTree build();
	}

	static public class BasicDataTreeBuilder implements DataTreeBuilder {
		public DataTree build() {
			return new DataTree();
		}
	}

	public static final int DEFAULT_TICK_TIME = 3000;
	protected int tickTime = DEFAULT_TICK_TIME;
	/** value of -1 indicates unset, use default */
	protected int minSessionTimeout = -1;
	/** value of -1 indicates unset, use default */
	protected int maxSessionTimeout = -1;
	private FileTxnSnapLog txnLogFactory = null;
	private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
	private ZKDatabase zkDb;
	protected long hzxid = 0;
	public final static Exception ok = new Exception("No prob");
	protected RequestProcessor firstProcessor;
	protected volatile boolean running;

	/**
	 * This is the secret that we use to generate passwords, for the moment it
	 * is more of a sanity check.
	 */
	static final private long superSecret = 0XB3415C00L;

	int requestsInProcess;

	private NIOServerCnxn.Factory serverCnxnFactory;

	private final ServerStats serverStats;

	/**
	 * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
	 * methods to prepare the instance (eg datadir, datalogdir, ticktime,
	 * builder, etc...)
	 * 
	 * @throws IOException
	 */
	public ZooKeeperServer() {
		serverStats = new ServerStats(this);
	}

	/**
	 * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
	 * actually start listening for clients until run() is invoked.
	 * 
	 * @throws IOException
	 */
	public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
			int minSessionTimeout, int maxSessionTimeout,
			DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
		serverStats = new ServerStats(this);
		this.txnLogFactory = txnLogFactory;
		this.zkDb = zkDb;
		this.tickTime = tickTime;
		this.minSessionTimeout = minSessionTimeout;
		this.maxSessionTimeout = maxSessionTimeout;

		LOG.info("Created server with tickTime " + tickTime
				+ " minSessionTimeout " + getMinSessionTimeout()
				+ " maxSessionTimeout " + getMaxSessionTimeout() + " datadir "
				+ txnLogFactory.getDataDir() + " snapdir "
				+ txnLogFactory.getSnapDir());
	}

	/**
	 * creates a zookeeperserver instance.
	 * 
	 * @param txnLogFactory
	 *            the file transaction snapshot logging class
	 * @param tickTime
	 *            the ticktime for the server
	 * @param treeBuilder
	 *            the datatree builder
	 * @throws IOException
	 */
	public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
			DataTreeBuilder treeBuilder) throws IOException {
		this(txnLogFactory, tickTime, -1, -1, treeBuilder, new ZKDatabase(
				txnLogFactory));
	}

	public ServerStats serverStats() {
		return serverStats;
	}

	public void dumpConf(PrintWriter pwriter) {
		pwriter.print("clientPort=");
		pwriter.println(getClientPort());
		pwriter.print("dataDir=");
		pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
		pwriter.print("dataLogDir=");
		pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath());
		pwriter.print("tickTime=");
		pwriter.println(getTickTime());
		pwriter.print("maxClientCnxns=");
		pwriter.println(serverCnxnFactory.getMaxClientCnxns());
		pwriter.print("minSessionTimeout=");
		pwriter.println(getMinSessionTimeout());
		pwriter.print("maxSessionTimeout=");
		pwriter.println(getMaxSessionTimeout());

		pwriter.print("serverId=");
		pwriter.println(getServerId());
	}

	/**
	 * This constructor is for backward compatibility with the existing unit
	 * test code. It defaults to FileLogProvider persistence provider.
	 */
	public ZooKeeperServer(File snapDir, File logDir, int tickTime)
			throws IOException {
		this(new FileTxnSnapLog(snapDir, logDir), tickTime,
				new BasicDataTreeBuilder());
	}

	/**
	 * Default constructor, relies on the config for its agrument values
	 * 
	 * @throws IOException
	 */
	public ZooKeeperServer(FileTxnSnapLog txnLogFactory,
			DataTreeBuilder treeBuilder) throws IOException {
		this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, treeBuilder,
				new ZKDatabase(txnLogFactory));
	}

	/**
	 * get the zookeeper database for this server
	 * 
	 * @return the zookeeper database for this server
	 */
	public ZKDatabase getZKDatabase() {
		return this.zkDb;
	}

	/**
	 * set the zkdatabase for this zookeeper server
	 * 
	 * @param zkDb
	 */
	public void setZKDatabase(ZKDatabase zkDb) {
		this.zkDb = zkDb;
	}

	/**
	 * Restore sessions and data
	 */
	public void loadData() throws IOException, InterruptedException {
		setZxid(zkDb.loadDataBase());

		zkDb.setDataTreeInit(true);
		// Make a clean snapshot
		takeSnapshot();
	}

	public void takeSnapshot() {
		try {
			txnLogFactory.save(zkDb.getDataTree());
			PurgeTxnLog.purge(new File("/var/data"), new File("/mnt/paxos2"), 4);
		} catch (IOException e) {
			LOG.fatal("Severe unrecoverable error, exiting", e);
			// This is a severe error that we cannot recover from,
			// so we need to exit
			System.exit(10);
		}
		
	}

	/**
	 * This should be called from a synchronized block on this!
	 */
	synchronized public long getZxid() {
		return hzxid;
	}

	synchronized long getNextZxid() {
		return ++hzxid;
	}

	synchronized public void setZxid(long zxid) {
		hzxid = zxid;
	}

	public long getTime() {
		return System.currentTimeMillis();
	}

	private void close(long sessionId) {
	}

	public void closeSession(long sessionId) {
		LOG.info("Closing session 0x" + Long.toHexString(sessionId));

		// we do not want to wait for a session close. send it as soon as we
		// detect it!
		close(sessionId);
	}

	public static class MissingSessionException extends IOException {
		private static final long serialVersionUID = 7467414635467261007L;

		public MissingSessionException(String msg) {
			super(msg);
		}
	}

	public void startdata() throws IOException, InterruptedException {
		// check to see if zkDb is not null
		if (zkDb == null) {
			zkDb = new ZKDatabase(this.txnLogFactory);
		}
		if (!zkDb.isInitialized()) {
			loadData();
		}
	}

	public void startup() {
		setupRequestProcessors();

		synchronized (this) {
			running = true;
			notifyAll();
		}
	}

	protected void setupRequestProcessors() {
		RequestProcessor finalProcessor = new FinalRequestProcessor(this);
		RequestProcessor syncProcessor = new SyncRequestProcessor(this,
				finalProcessor);
		((SyncRequestProcessor) syncProcessor).start();
		firstProcessor = new PrepRequestProcessor(this, syncProcessor);
		((PrepRequestProcessor) firstProcessor).start();
	}

	public boolean isRunning() {
		return running;
	}

	public void shutdown() {
		// new RuntimeException("Calling shutdown").printStackTrace();
		this.running = false;
		// Since sessionTracker and syncThreads poll we just have to
		// set running to false and they will detect it during the poll
		// interval.
		if (firstProcessor != null) {
			firstProcessor.shutdown();
		}
		if (zkDb != null) {
			zkDb.clear();
		}

	}

	synchronized public void incInProcess() {
		requestsInProcess++;
	}

	synchronized public void decInProcess() {
		requestsInProcess--;
	}

	public int getInProcess() {
		return requestsInProcess;
	}

	public long getServerId() {
		return 0;
	}

	/**
	 * @param sessionId
	 * @param xid
	 * @param bb
	 */
	private void submitRequest(long sessionId, int type, int xid, int nonce, ByteBuffer bb) {
		Request si = new Request(sessionId, xid, nonce, type, bb);
		submitRequest(si);
	}

	public void submitRequest(Request si) {
		if (firstProcessor == null) {
			synchronized (this) {
				try {
					while (!running) {
						wait(1000);
					}
				} catch (InterruptedException e) {
					LOG.warn("Unexpected interruption", e);
				}
				if (firstProcessor == null) {
					throw new RuntimeException("Not started");
				}
			}
		}
		// try {
		// touch(si.cnxn);
		boolean validpacket = Request.isValid(si.type);
		if (validpacket) {
			firstProcessor.processRequest(si);
			incInProcess();
		} else {
			LOG.warn("Dropping packet at server of type " + si.type);
			// if invalid packet drop the packet.
		}
		// } catch (MissingSessionException e) {
		// if (LOG.isDebugEnabled()) {
		// LOG.debug("Dropping request: " + e.getMessage());
		// }
		// }
	}

	static public void byteBuffer2Record(ByteBuffer bb, Record record)
			throws IOException {
		BinaryInputArchive ia;
		ia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
		record.deserialize(ia, "request");
	}

	public static int getSnapCount() {
		String sc = System.getProperty("zookeeper.snapCount");
		try {
			return Integer.parseInt(sc);
		} catch (Exception e) {
			return Configuration.snapshotInterval;
		}
	}

	public int getGlobalOutstandingLimit() {
		String sc = System.getProperty("zookeeper.globalOutstandingLimit");
		int limit;
		try {
			limit = Integer.parseInt(sc);
		} catch (Exception e) {
			limit = 1000;
		}
		return limit;
	}

	public void setServerCnxnFactory(NIOServerCnxn.Factory factory) {
		serverCnxnFactory = factory;
	}

	public NIOServerCnxn.Factory getServerCnxnFactory() {
		return serverCnxnFactory;
	}

	/**
	 * return the last proceesed id from the datatree
	 */
	public long getLastProcessedZxid() {
		return zkDb.getDataTreeLastProcessedZxid();
	}

	/**
	 * return the outstanding requests in the queue, which havent been processed
	 * yet
	 */
	public long getOutstandingRequests() {
		return getInProcess();
	}

	/**
	 * trunccate the log to get in sync with others if in a quorum
	 * 
	 * @param zxid
	 *            the zxid that it needs to get in sync with others
	 * @throws IOException
	 */
	public void truncateLog(long zxid) throws IOException {
		this.zkDb.truncateLog(zxid);
	}

	public int getTickTime() {
		return tickTime;
	}

	public void setTickTime(int tickTime) {
		LOG.info("tickTime set to " + tickTime);
		this.tickTime = tickTime;
	}

	public int getMinSessionTimeout() {
		return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
	}

	public void setMinSessionTimeout(int min) {
		LOG.info("minSessionTimeout set to " + min);
		this.minSessionTimeout = min;
	}

	public int getMaxSessionTimeout() {
		return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
	}

	public void setMaxSessionTimeout(int max) {
		LOG.info("maxSessionTimeout set to " + max);
		this.maxSessionTimeout = max;
	}

	public int getClientPort() {
		return serverCnxnFactory != null ? serverCnxnFactory.ss.socket()
				.getLocalPort() : -1;
	}

	public void setTxnLogFactory(FileTxnSnapLog txnLog) {
		this.txnLogFactory = txnLog;
	}

	public FileTxnSnapLog getTxnLogFactory() {
		return this.txnLogFactory;
	}

	public String getState() {
		return "standalone";
	}

}
